package com.data.mall.etl.service.impl;

import com.data.mall.common.KafkaTrackingDTO;
import com.data.mall.etl.domain.enums.Params;
import com.data.mall.etl.service.PromotionEventETL;
import com.data.mall.service.dto.PromoDTO;
import com.data.mall.utils.CKUtils;
import com.data.mall.utils.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.sql.SQLException;
import java.util.Map;

@Slf4j
@Service
public class PromotionEventETLImpl implements PromotionEventETL {

    @Autowired
    private CKUtils ckUtils;

    @Override
    public Object publishPromotion(KafkaTrackingDTO dto) {
        insertPromotion(dto);
        return dto;
    }

    private void insertPromotion(KafkaTrackingDTO dto) {
        Map<Params, String> param = dto.getParamList();
        PromoDTO promotion = JSON.parseObject(param.get(Params.PROMOTION), PromoDTO.class);
        String sql = "insert into datamall.dim_promotion(promotion_id,promotion_name,start_time,end_time,item_id,promotion_price) VALUES (" +
                promotion.getId() + ",'" + promotion.getPromoName() + "','" + promotion.getStartTime() + "','" + promotion.getEndTime() + "'," +
                promotion.getItemId() + ",'" + promotion.getPromoItemPrice() + "'" +
                ")";
        try {
            ckUtils.insert(sql);
        } catch (SQLException e) {
            log.error("insertMemberInfo error");
        }
    }
}
